/**
 * 
 */
package co.recloud.ariadne.store;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;

import co.recloud.ariadne.model.Host;
import co.recloud.ariadne.model.logical.Transaction;
import co.recloud.ariadne.queue.GossipQueue;
import co.recloud.ariadne.queue.OutboundRequestQueue;
import co.recloud.ariadne.request.DataRequest;
import co.recloud.ariadne.request.KeyUpdateGossipRequest;
import co.recloud.ariadne.request.Request;
import co.recloud.ariadne.response.DataResponse;
import co.recloud.ariadne.response.Response;
import com.sun.tools.javac.util.Pair;

/**
 * @author alex
 * 
 */
public class TransactionCache {

	private static ConcurrentMap<Long, TransactionCache> singletons = new ConcurrentHashMap<Long, TransactionCache>();
	private ConcurrentMap<Long, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, Object>>>>> data, writeData;

	protected TransactionCache() {
		data = new ConcurrentHashMap<Long, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, Object>>>>>();
		writeData = new ConcurrentHashMap<Long, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, Object>>>>>();
	}

	public static TransactionCache getInstance(Long transactionId) {
        if(!singletons.containsKey(transactionId)) {
		    singletons.putIfAbsent(transactionId, new TransactionCache());
        }
		return singletons.get(transactionId);
	}

	private boolean checkDataPath(Transaction transaction, String schema,
			String columnFamily, String key, boolean create) {
		return checkPath(data, transaction, schema, columnFamily, key, create);
	}

	private boolean checkPath(
			ConcurrentMap<Long, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, Object>>>>> data,
			Transaction transaction, boolean create) {
		Long transactionId = transaction.getId();
		if (create) {
            if(!data.containsKey(transactionId)) {
                data.putIfAbsent(
                        transactionId,
                        new ConcurrentHashMap<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, Object>>>>());
            }
		} else if (!data.containsKey(transactionId)
				|| data.get(transactionId) == null) {
			return false;
		}
		return true;
	}

	private boolean checkPath(
			ConcurrentMap<Long, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, Object>>>>> data,
			Transaction transaction, String schema, boolean create) {
		if (checkPath(data, transaction, create)) {
			Long transactionId = transaction.getId();
			if (create) {
                if(!data.get(transactionId).containsKey(schema)) {
                    data.get(transactionId)
                            .putIfAbsent(
                                    schema,
                                    new ConcurrentHashMap<String, ConcurrentMap<String, ConcurrentMap<String, Object>>>());
                }
			}
			boolean hasSchema = data.get(transactionId).containsKey(schema)
					&& data.get(transactionId).get(schema) != null;
			if (!hasSchema && !create) {
				return false;
			}
			return true;
		}
		return false;
	}

	private boolean checkPath(
			ConcurrentMap<Long, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, Object>>>>> data,
			Transaction transaction, String schema, String columnFamily,
			boolean create) {
		if (checkPath(data, transaction, schema, create)) {
			Long transactionId = transaction.getId();
			if (create) {
                if(!data.get(transactionId).get(schema).containsKey(columnFamily)) {
                    data.get(transactionId)
                            .get(schema)
                            .putIfAbsent(
                                    columnFamily,
                                    new ConcurrentHashMap<String, ConcurrentMap<String, Object>>());
                }
			}
			boolean hasColumnFamily = data.get(transactionId).get(schema)
					.containsKey(columnFamily)
					&& data.get(transactionId).get(schema).get(columnFamily) != null;
			if (!hasColumnFamily) {
				return false;
			}
			return true;
		}
		return false;
	}

	private boolean checkPath(
			ConcurrentMap<Long, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, Object>>>>> data,
			Transaction transaction, String schema, String columnFamily,
			String key, boolean create) {
		if (checkPath(data, transaction, schema, columnFamily, create)) {
			Long transactionId = transaction.getId();
			boolean hasKey = data.get(transactionId).get(schema)
					.get(columnFamily).containsKey(key)
					&& data.get(transactionId).get(schema).get(columnFamily)
							.get(key) != null;
			if (!hasKey && create) {
				data.get(transactionId).get(schema).get(columnFamily)
						.put(key, new ConcurrentHashMap<String, Object>());
			} else if (!hasKey) {
				return false;
			}
			return true;
		}
		return false;
	}

	/**
	 * Get the value of a single column for a single record. Does not support
	 * SuperColumns
	 * 
	 * @param columnFamily
	 *            The column family name
	 * @param key
	 *            The key
	 * @param column
	 *            The column name -- MUST NOT be a super-column
	 * @param resultType
	 *            The result type
	 * @return The value
	 */
	@SuppressWarnings("rawtypes")
	public Object getColumn(Transaction transaction,
			String schema, String columnFamily, String key, String column,
			Class resultType, boolean isResultSet) {
		Object result = null;
		Set<String> columns = new HashSet<String>();
		columns.add(column);
		if(!isResultSet) {
			checkKey(transaction, schema, columnFamily, key, columns);
		}
		if (checkDataPath(transaction, schema, columnFamily, key, false)) {
			result = data.get(transaction.getId()).get(schema)
					.get(columnFamily).get(key).get(column);
		}
		return result;
	}

	public Map<String, Object> getAllColumns(
			Transaction transaction, String schema, String columnFamily,
			Object key, boolean isResultSet) {
		Map<String, Object> result = null;
		if(!isResultSet) {
			checkKey(transaction, schema, columnFamily, (String) key, null);
		}
		if (checkPath(data, transaction, schema, columnFamily, (String) key,
				false)) {
			result = data.get(transaction.getId()).get(schema)
					.get(columnFamily).get(key);
		}
		return result;
	}

	/**
	 * Get the String value of a single column for a single record. Does not
	 * support SuperColumns
	 * 
	 * @param columnFamily
	 *            The column family name
	 * @param key
	 *            The key
	 * @param column
	 *            The column name -- MUST NOT be a super-column
	 * @return The value
	 */
	public String getString(Transaction transaction,
			String schema, String columnFamily, String key, String column, boolean isResultSet) {
		return (String) getColumn(transaction, schema, columnFamily, key,
				column, String.class, isResultSet);
	}

	public Map<String, Long> getAllLongs(Transaction transaction,
			String schema, String columnFamily, Object key, boolean isResultSet) {
		Map<String, Object> rawResults = getAllColumns(transaction, schema,
				columnFamily, key, isResultSet);
		Map<String, Long> result = new ConcurrentHashMap<String, Long>();
		if (rawResults != null) {
			for (String column : rawResults.keySet()) {
				result.put(column, Long.parseLong(rawResults.get(column).toString()));
			}
			return result;
		}
		return null;
	}

	private void checkKey(Transaction transaction, String schema,
			String columnFamily, String key, Set<String> columns) {
		Set<String> pullColumns = new HashSet<String>();
		if (columns != null) {
			pullColumns.addAll(columns);
			if (checkPath(data, transaction, schema, columnFamily, key, false)) {
				pullColumns.removeAll(data.get(transaction.getId()).get(schema)
						.get(columnFamily).get(key).keySet());
			}
		} else {
			pullColumns = null;
		}
		if ((pullColumns == null && !checkPath(data, transaction, schema, columnFamily, key,
				false))
				|| (pullColumns != null && !pullColumns.isEmpty())
				|| !checkPath(data, transaction, schema, columnFamily, key,
						false)) {
			checkPath(data, transaction, schema, columnFamily, key, true);
			Map<String, Object> row = pullData(transaction, schema,
					columnFamily, key, pullColumns);
			if (row != null && !row.isEmpty()) {
				if (!checkPath(writeData, transaction, schema, columnFamily, key, false)) {
					putRow(transaction, schema, columnFamily, key, row);
				} else {
					Map<String, Object> blindRow = writeData
							.get(transaction.getId()).get(schema)
							.get(columnFamily).get(key);
					putRow(transaction, schema, columnFamily, key, row);
					data.get(transaction.getId()).get(schema).get(columnFamily)
							.get(key).putAll(blindRow);
				}
			}
		}
	}

	private void putRow(Transaction transaction, String schema, String columnFamily, String key, Map<String, Object> row) {
		for (String columnName : row.keySet()) {
			Object value = row.get(columnName);
			if (value != null) {
				data.get(transaction.getId()).get(schema)
						.get(columnFamily).get(key)
						.put(columnName, value);
			}
		}
	}
	private String toPath(String schema, String columnFamily, String key) {
		return schema + "." + columnFamily + ":" + key;
	}

	private Map<String, Object> pullData(Transaction transaction,
			String schema, String columnFamily, String key, Set<String> columns) {
		AddressTable at = AddressTable.getInstance();
		HostTable ht = HostTable.getInstance();
		MemoryCache memCache = MemoryCache.getInstance();
		String path = toPath(schema, columnFamily, key);
		Map<String, Object> row = memCache
				.get(path, transaction.getStartTime());
		if (row != null) {
			return row;
		}
		Host host = at.resolve(columnFamily, key);
		List<Host> hosts = new ArrayList<Host>(2);
		//hosts.add(host);
		if (host.getLocation() != null) {
			Host coHost = ht.getBestCoHostForRead(host);
			hosts.add(coHost);
		}
		Queue<Response> responses = new ConcurrentLinkedQueue<Response>();
		List<DataRequest> requests = new ArrayList<DataRequest>(2);
		for (Host target : hosts) {
			DataRequest dataRequest = new DataRequest();
			requests.add(dataRequest);
			dataRequest.setType(DataRequest.GET);
			dataRequest.setTransaction(transaction);
			dataRequest.setSchema(schema);
			dataRequest.setColumnFamily(columnFamily);
			dataRequest.setKey(key);
			dataRequest.setColumns(columns);
			dataRequest.setTarget(target);
			dataRequest.setPrimary(host.getLocation());
			dataRequest.sendBlocking(target);
			if(dataRequest.getResponse() != null) {
				responses.add(dataRequest.getResponse());
			}

		}
		DataResponse[] responseArray = new DataResponse[2];
		if(responses.size() > 0) {
			responseArray[0] = (DataResponse) responses.remove();
		}
		if(responses.size() > 0) {
			responseArray[1] = (DataResponse) responses.remove();
		}
		Long rowTime = null;
		if ((responseArray[1] == null && responseArray[0] != null) || 
				(responseArray[0] != null && responseArray[1] != null && responseArray[0].getTime() > responseArray[1].getTime())) {
			row = responseArray[0].getData();
			rowTime = responseArray[0].getRowTime();
		} else if(responseArray[1] != null){
			row = responseArray[1].getData();
			rowTime = responseArray[1].getRowTime();
		}
		if (row != null) {
			memCache.put(path, rowTime, row);
		}
		return row;
	}

	/**
	 * Set the value of a single column for a single record. Encodes as UTF8
	 * 
	 * @param columnFamily
	 *            The column family name
	 * @param key
	 *            The key
	 * @param value
	 *            The string value to set
	 */
	public void putColumn(Transaction transaction, String schema,
			String columnFamily, String key, String columnName, Object value) {
		checkDataPath(transaction, schema, columnFamily, key, true);
		data.get(transaction.getId()).get(schema).get(columnFamily).get(key)
				.put(columnName, value);
        if(! columnFamily.startsWith("RS")) {
            checkPath(writeData, transaction, schema, columnFamily, key, true);
            writeData.get(transaction.getId()).get(schema).get(columnFamily).get(key)
                    .put(columnName, value);
        }
	}

	public void deleteColumn(Transaction transaction,
			String schema, String columnFamily, String key, String columnName) {
		checkDataPath(transaction, schema, columnFamily, key, true);
		data.get(transaction.getId()).get(schema).get(columnFamily).get(key)
				.remove(columnName);
	}

	public Set<String> getAllKeys(Transaction transaction,
			String schema, String columnFamily, boolean isResultSet) {
		Set<String> result = new HashSet<String>();
		if (isResultSet) {
			if (checkPath(data, transaction, schema, columnFamily, false)) {
				result = data.get(transaction.getId()).get(schema)
						.get(columnFamily).keySet();
			}
		}
		return result;
	}

	public void dropTable(Transaction transaction, String schema,
			String columnFamily) {
		if (checkPath(data, transaction, schema, columnFamily, false)) {
			data.get(transaction.getId()).get(schema).remove(columnFamily);
		}
	}

	public void writeBack(Transaction transaction) {
		Long transactionId = transaction.getId();
		Map<Host, Map<String, Map<String, Map<String, Map<String, Object>>>>> blocks = new HashMap<Host, Map<String, Map<String, Map<String, Map<String, Object>>>>>();
		AddressTable at = AddressTable.getInstance();
		HostTable ht = HostTable.getInstance();
		MemoryCache memCache = MemoryCache.getInstance();
		Set<String> paths = new HashSet<String>();
		if (checkPath(writeData, transaction, false)) {
			for (String schema : writeData.get(transactionId).keySet()) {
				if (writeData.get(transactionId).get(schema) != null) {
					for (String columnFamily : writeData.get(transactionId)
							.get(schema).keySet()) {
						if (writeData.get(transactionId).get(schema)
								.get(columnFamily) != null) {
							for (String key : writeData.get(transactionId)
									.get(schema).get(columnFamily).keySet()) {
								if (writeData.get(transactionId).get(schema)
										.get(columnFamily).get(key) != null) {
                                    Host host = at.resolve(columnFamily, key);
                                    if (!blocks.containsKey(host)) {
                                        blocks.put(
                                                host,
                                                new HashMap<String, Map<String, Map<String, Map<String, Object>>>>());
                                    }
                                    if (!blocks.get(host).containsKey(
                                            schema)) {
                                        blocks.get(host)
                                                .put(schema,
                                                        new HashMap<String, Map<String, Map<String, Object>>>());
                                    }
                                    if (!blocks.get(host).get(schema)
                                            .containsKey(columnFamily)) {
                                        blocks.get(host)
                                                .get(schema)
                                                .put(columnFamily,
                                                        new HashMap<String, Map<String, Object>>());
                                    }
                                    if (!blocks.get(host).get(schema)
                                            .get(columnFamily)
                                            .containsKey(key)) {
                                        blocks.get(host)
                                                .get(schema)
                                                .get(columnFamily)
                                                .put(key,
                                                        new HashMap<String, Object>());
                                    }
                                    blocks.get(host)
                                            .get(schema)
                                            .get(columnFamily)
                                            .get(key)
                                            .putAll(writeData.get(transactionId)
                                                    .get(schema)
                                                    .get(columnFamily)
                                                    .get(key));
                                    memCache.put(
                                            toPath(schema, columnFamily,
                                                    key),
                                            transaction.getCommitTime(),
                                            writeData.get(transactionId)
                                                    .get(schema)
                                                    .get(columnFamily)
                                                    .get(key));
                                    paths.add(toPath(schema, columnFamily,
                                            key));
								}
							}
						}
					}
				}
			}
		}
        BlockingQueue<Request> requestQueue= OutboundRequestQueue.getQueue();
		for (Host host : blocks.keySet()) {
			Set<Host> hosts = new HashSet<Host>();
			Host coHost = ht.getCoReplica(host, host);
			hosts.add(host);
			hosts.add(coHost);
            Host coCoChost = ht.getCoReplica(host, coHost);
            hosts.add(coCoChost);
			for(Host target : hosts) {
				DataRequest dataRequest = new DataRequest();
				dataRequest.setTransaction(transaction);
				dataRequest.setType(DataRequest.PUT);
				dataRequest.setData(blocks.get(host));
				dataRequest.setPrimary(host.getLocation());
				dataRequest.setTarget(target);
				requestQueue.add(dataRequest);
			}
		}
        BlockingQueue<Request> gossipQueue = GossipQueue.getQueue();
		for (String path : paths) {
			KeyUpdateGossipRequest updateGossip = new KeyUpdateGossipRequest();
			updateGossip.setPath(path);
			updateGossip.setUpdatedTime(transaction.getCommitTime());
            gossipQueue.add(updateGossip);
		}
	}

	public void freeTransaction(Transaction transaction) {
		if (checkPath(data, transaction, false)) {
			data.remove(transaction.getId());
		}
        singletons.remove(transaction.getId());
	}

	public Map<String, Object> getColumns(Transaction transaction,
			String schema, String columnFamily, String key, Set<String> columns) {
		Map<String, Object> result = new HashMap<String, Object>();
		checkKey(transaction, schema, columnFamily, (String) key, columns);
		if (checkPath(data, transaction, schema, columnFamily, (String) key,
				false)) {
			for (String column : columns) {
				Object value = data.get(transaction.getId()).get(schema)
						.get(columnFamily).get(key).get(column);
				if (value != null) {
					result.put(column, value);
				}
			}
		}
		return result;
	}
}
